package cn.com.wind.bdg.polish.polish.service;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import cn.com.wind.bdg.polish.polish.config.KafkaConfig;
import cn.com.wind.bdg.polish.polish.utils.PropertyFileUtil;

import java.util.*;
import java.util.concurrent.Future;

@Service
public class KafkaServiceImpl implements KafkaService {
    private static Logger LOGGER = LoggerFactory.getLogger(KafkaServiceImpl.class);
    @Autowired
    private KafkaConfig config;

    private volatile boolean reqClose;
    private volatile boolean closed;

    private Consumer<String, String> messageConsumer;
    private Producer<String, String> messageProducer;

    /**
    *@Description Kafka集群的初始化  包含加载配置文件，消费者、生产者的初始化
    *@Param []
    *@Return void
    *@Author wsun.Frank
    *@Date 2020/8/12
    */
    @Override
    public void init() {
        LOGGER.info("初始化生产者..");
        if (messageProducer == null && config.isProducerEnable()) {
            messageProducer = new KafkaProducer<>(PropertyFileUtil.load(config.getProducerConfig()),
                    new StringSerializer(), new StringSerializer());
        }


        LOGGER.info("初始化消费者..");
        if (messageConsumer == null) {
            messageConsumer = new KafkaConsumer<>(PropertyFileUtil.load(config.getConsumerConfig()),
                    new StringDeserializer(), new StringDeserializer());
            messageConsumer.subscribe(config.getTopicLists(config.getConsumerTopic()));
        }
    }

    /**
    *@Description Kafka消息发送方式
    *@Param [key, value] 消息的key 消息的value
    *@Return void
    *@Author wsun.Frank
    *@Date 2020/8/12
    */
    @Override
    public void produce(String key, String value) {
        //首先判断如果value为空则跳过
        if(StringUtils.isBlank(value)) {
            return;
        }
        RecordMetadata recordMetadata = null;
        if (messageProducer != null) {
            String topic = "TP_GPA_IMPORTDB_21732";

            //1. 发送后不理会发送结果
//            messageProducer.send(new ProducerRecord<String, String>(topic,
//                    key, value));

            //3. 同步发送(阻塞)
            Future<RecordMetadata> future = messageProducer.send(new ProducerRecord<String, String>(topic,
                    key, value));
            try {
                recordMetadata = future.get();
            } catch (Exception e) {
                LOGGER.error("生产发送错误:",e);
            }
            //可以从返回的原信息中获得很多信息
            long offset = recordMetadata.offset();
            int partition = recordMetadata.partition();

            //2. 异步回调官方案例 （不阻塞）
            // JavaProducer的send方法会返回一个JavaFuture对象供用户稍后获取发送结果。这就是回调机制
            // RecordMetadata 和 Exception 不可能同时为空，
            // 消息发送成功时，Exception为null，消息发送失败时，metadata为空
//            messageProducer.send(
//                    new ProducerRecord<String, String>(topic, key,
//                            value),
//                    new Callback() {
//                        @Override
//                        public void onCompletion(RecordMetadata metadata, Exception ex) {
//                            if (ex != null) {
//                                // 发生错误
//
//                            } else {
//                                // 发送成功
//                            }
//                        }
//                    });
        }
    }

    /**
    *@Description 消费者消费消息方法
    *@Param []
    *@Return void
    *@Author wsun.Frank
    *@Date 2020/8/12
    */
    @Override
    public List<String> consume() {
        //计数器
    	List<String> listKafka = new ArrayList<String>();
        long retry = 0;
        while (!reqClose) {
            try {
                //此处的poll作用为拉取消息
                ConsumerRecords<String, String> records = messageConsumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    if (reqClose) {
                        LOGGER.info("跳出消費..");
                        break;
                    }
                    String key =  record.key();
                    String value = record.value();
                    consumeRecord(key,value);
                    listKafka.add(value);
                }
                if (config.isConsumerCommit()) {
                    messageConsumer.commitSync();
                }
            } catch (Exception ex) {
                if (retry % 60 == 0) {
                    // 防止Kafka失联时，错误过多，控制1分钟出一次日志
                    LOGGER.error("Kafka Operation Failed", ex);
                    retry++;
                }
            }
        }
        // 释放资源
        this.close();
        return listKafka;
    }

    /**
    *@Description 处理消息的业务方法 暂时为空
    *@Param [msg] kafka消息
    *@Return void
    *@Author wsun.Frank
    *@Date 2020/8/12
    */
    public void consumeRecord(String key,String msg) {
        LOGGER.info("接收到kafka消息，key:{},value:{}",key,msg);
    }

    
    /**
    *@Description 重置消费者偏移量
    *@Param []
    *@Return void
    *@Author wsun.Frank
    *@Date 2020/8/13
    */
    @Override
    public void resetOffset() {
        Consumer<String, String> consumer = new KafkaConsumer<>(PropertyFileUtil.load(config.getConsumerConfig()),
                new StringDeserializer(), new StringDeserializer());
        List<String> topics = config.getTopicLists(config.getConsumerTopic());
        for (String topic : topics) {
            consumer.subscribe(Arrays.asList(topic));
            ConsumerRecords<String, String> records = consumer.poll(2000);
            Set<TopicPartition> topicList = consumer.assignment();
            Map<TopicPartition, Long> endMap = consumer.endOffsets(topicList);
            Map<TopicPartition, Long> beginmap = consumer.beginningOffsets(topicList);
            long singleTpLagSize = 1000000;
            for (TopicPartition tp : topicList) {
                long endOffset = endMap.get(tp);
                long beginOffset = beginmap.get(tp);
                long aimOffset = endOffset - singleTpLagSize;
                if (aimOffset > 0 && aimOffset >= beginOffset) {
                    consumer.seek(tp, endOffset-singleTpLagSize);
                } else {
                    consumer.seek(tp, beginOffset);
                }
            }
            consumer.commitSync();
        }
    }

    public boolean isClosed() {
        return closed;
    }

    private void close() {
        try {
            if (messageConsumer != null) {
                messageConsumer.close();
            }
            if (messageProducer != null) {
                messageProducer.close();
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        closed = true;
        LOGGER.info("Kafka資源釋放完畢！");
    }

    public void reqClose() {
        this.reqClose = true;
    }
}
